package com.xiaojiezhu.spark.rdd.wordcount;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * @Author 朱小杰
 * 时间 2017-09-23 .22:31
 * 说明 ...
 */
public class JavaWordCount {

    public static void main(String[] args) {
        String dir = "D:/spark/workcount/";
        SparkConf conf = new SparkConf().setMaster("local").setAppName("wordCount");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> input = sc.textFile(dir + "in/text.txt");

        //切分为单词
        JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        //JavaRDD<String> words = input.flatMap(s -> Arrays.asList(((String) s).split(" ")).iterator());//java8 表达式版，同等于上

        //转换为键值对并计数
        JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
        //JavaPairRDD<String, Integer> counts = words.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey((x, y) -> x + y); //使用java8的语法，同等于上

        counts.saveAsTextFile(dir + "result");

    }
}
